home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- '''Interface to GNU Privacy Guard (GnuPG)
-
- GnuPGInterface is a Python module to interface with GnuPG.
- It concentrates on interacting with GnuPG via filehandles,
- providing access to control GnuPG via versatile and extensible means.
-
- This module is based on GnuPG::Interface, a Perl module by the same author.
-
- Normally, using this module will involve creating a
- GnuPG object, setting some options in it\'s \'options\' data member
- (which is of type Options), creating some pipes
- to talk with GnuPG, and then calling the run() method, which will
- connect those pipes to the GnuPG process. run() returns a
- Process object, which contains the filehandles to talk to GnuPG with.
-
- Example code:
-
- >>> import GnuPGInterface
- >>>
- >>> plaintext = "Three blind mice"
- >>> passphrase = "This is the passphrase"
- >>>
- >>> gnupg = GnuPGInterface.GnuPG()
- >>> gnupg.options.armor = 1
- >>> gnupg.options.meta_interactive = 0
- >>> gnupg.options.extra_args.append(\'--no-secmem-warning\')
- >>>
- >>> # Normally we might specify something in
- >>> # gnupg.options.recipients, like
- >>> # gnupg.options.recipients = [ \'0xABCD1234\', \'bob@foo.bar\' ]
- >>> # but since we\'re doing symmetric-only encryption, it\'s not needed.
- >>> # If you are doing standard, public-key encryption, using
- >>> # --encrypt, you will need to specify recipients before
- >>> # calling gnupg.run()
- >>>
- >>> # First we\'ll encrypt the test_text input symmetrically
- >>> p1 = gnupg.run([\'--symmetric\'],
- ... create_fhs=[\'stdin\', \'stdout\', \'passphrase\'])
- >>>
- >>> p1.handles[\'passphrase\'].write(passphrase)
- >>> p1.handles[\'passphrase\'].close()
- >>>
- >>> p1.handles[\'stdin\'].write(plaintext)
- >>> p1.handles[\'stdin\'].close()
- >>>
- >>> ciphertext = p1.handles[\'stdout\'].read()
- >>> p1.handles[\'stdout\'].close()
- >>>
- >>> # process cleanup
- >>> p1.wait()
- >>>
- >>> # Now we\'ll decrypt what we just encrypted it,
- >>> # using the convience method to get the
- >>> # passphrase to GnuPG
- >>> gnupg.passphrase = passphrase
- >>>
- >>> p2 = gnupg.run([\'--decrypt\'], create_fhs=[\'stdin\', \'stdout\'])
- >>>
- >>> p2.handles[\'stdin\'].write(ciphertext)
- >>> p2.handles[\'stdin\'].close()
- >>>
- >>> decrypted_plaintext = p2.handles[\'stdout\'].read()
- >>> p2.handles[\'stdout\'].close()
- >>>
- >>> # process cleanup
- >>> p2.wait()
- >>>
- >>> # Our decrypted plaintext:
- >>> decrypted_plaintext
- \'Three blind mice\'
- >>>
- >>> # ...and see it\'s the same as what we orignally encrypted
- >>> assert decrypted_plaintext == plaintext, "GnuPG decrypted output does not match original input"
- >>>
- >>>
- >>> ##################################################
- >>> # Now let\'s trying using run()\'s attach_fhs paramter
- >>>
- >>> # we\'re assuming we\'re running on a unix...
- >>> input = open(\'/etc/motd\')
- >>>
- >>> p1 = gnupg.run([\'--symmetric\'], create_fhs=[\'stdout\'],
- ... attach_fhs={\'stdin\': input})
- >>>
- >>> # GnuPG will read the stdin from /etc/motd
- >>> ciphertext = p1.handles[\'stdout\'].read()
- >>>
- >>> # process cleanup
- >>> p1.wait()
- >>>
- >>> # Now let\'s run the output through GnuPG
- >>> # We\'ll write the output to a temporary file,
- >>> import tempfile
- >>> temp = tempfile.TemporaryFile()
- >>>
- >>> p2 = gnupg.run([\'--decrypt\'], create_fhs=[\'stdin\'],
- ... attach_fhs={\'stdout\': temp})
- >>>
- >>> # give GnuPG our encrypted stuff from the first run
- >>> p2.handles[\'stdin\'].write(ciphertext)
- >>> p2.handles[\'stdin\'].close()
- >>>
- >>> # process cleanup
- >>> p2.wait()
- >>>
- >>> # rewind the tempfile and see what GnuPG gave us
- >>> temp.seek(0)
- >>> decrypted_plaintext = temp.read()
- >>>
- >>> # compare what GnuPG decrypted with our original input
- >>> input.seek(0)
- >>> input_data = input.read()
- >>>
- >>> assert decrypted_plaintext == input_data, "GnuPG decrypted output does not match original input"
-
- To do things like public-key encryption, simply pass do something
- like:
-
- gnupg.passphrase = \'My passphrase\'
- gnupg.options.recipients = [ \'bob@foobar.com\' ]
- gnupg.run( [\'--sign\', \'--encrypt\'], create_fhs=..., attach_fhs=...)
-
- Here is an example of subclassing GnuPGInterface.GnuPG,
- so that it has an encrypt_string() method that returns
- ciphertext.
-
- >>> import GnuPGInterface
- >>>
- >>> class MyGnuPG(GnuPGInterface.GnuPG):
- ...
- ... def __init__(self):
- ... GnuPGInterface.GnuPG.__init__(self)
- ... self.setup_my_options()
- ...
- ... def setup_my_options(self):
- ... self.options.armor = 1
- ... self.options.meta_interactive = 0
- ... self.options.extra_args.append(\'--no-secmem-warning\')
- ...
- ... def encrypt_string(self, string, recipients):
- ... gnupg.options.recipients = recipients # a list!
- ...
- ... proc = gnupg.run([\'--encrypt\'], create_fhs=[\'stdin\', \'stdout\'])
- ...
- ... proc.handles[\'stdin\'].write(string)
- ... proc.handles[\'stdin\'].close()
- ...
- ... output = proc.handles[\'stdout\'].read()
- ... proc.handles[\'stdout\'].close()
- ...
- ... proc.wait()
- ... return output
- ...
- >>> gnupg = MyGnuPG()
- >>> ciphertext = gnupg.encrypt_string("The secret", [\'0x260C4FA3\'])
- >>>
- >>> # just a small sanity test here for doctest
- >>> import types
- >>> assert isinstance(ciphertext, types.StringType), "What GnuPG gave back is not a string!"
-
- Here is an example of generating a key:
- >>> import GnuPGInterface
- >>> gnupg = GnuPGInterface.GnuPG()
- >>> gnupg.options.meta_interactive = 0
- >>>
- >>> # We will be creative and use the logger filehandle to capture
- >>> # what GnuPG says this time, instead stderr; no stdout to listen to,
- >>> # but we capture logger to surpress the dry-run command.
- >>> # We also have to capture stdout since otherwise doctest complains;
- >>> # Normally you can let stdout through when generating a key.
- >>>
- >>> proc = gnupg.run([\'--gen-key\'], create_fhs=[\'stdin\', \'stdout\',
- ... \'logger\'])
- >>>
- >>> proc.handles[\'stdin\'].write(\'\'\'Key-Type: DSA
- ... Key-Length: 1024
- ... # We are only testing syntax this time, so dry-run
- ... %dry-run
- ... Subkey-Type: ELG-E
- ... Subkey-Length: 1024
- ... Name-Real: Joe Tester
- ... Name-Comment: with stupid passphrase
- ... Name-Email: joe@foo.bar
- ... Expire-Date: 2y
- ... Passphrase: abc
- ... %pubring foo.pub
- ... %secring foo.sec
- ... \'\'\')
- >>>
- >>> proc.handles[\'stdin\'].close()
- >>>
- >>> report = proc.handles[\'logger\'].read()
- >>> proc.handles[\'logger\'].close()
- >>>
- >>> proc.wait()
-
-
- COPYRIGHT:
-
- Copyright (C) 2001 Frank J. Tobin, ftobin@neverending.org
-
- LICENSE:
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- or see http://www.gnu.org/copyleft/lesser.html
- '''
- import os
- import sys
- import fcntl
- __author__ = 'Frank J. Tobin, ftobin@neverending.org'
- __version__ = '0.3.2'
- __revision__ = '$Id: GnuPGInterface.py,v 1.22 2002/01/11 20:22:04 ftobin Exp $'
- _stds = [
- 'stdin',
- 'stdout',
- 'stderr']
- _fd_modes = {
- 'stdin': 'w',
- 'stdout': 'r',
- 'stderr': 'r',
- 'passphrase': 'w',
- 'command': 'w',
- 'logger': 'r',
- 'status': 'r' }
- _fd_options = {
- 'passphrase': '--passphrase-fd',
- 'logger': '--logger-fd',
- 'status': '--status-fd',
- 'command': '--command-fd' }
-
- class GnuPG:
- '''Class instances represent GnuPG.
-
- Instance attributes of a GnuPG object are:
-
- * call -- string to call GnuPG with. Defaults to "gpg"
-
- * passphrase -- Since it is a common operation
- to pass in a passphrase to GnuPG,
- and working with the passphrase filehandle mechanism directly
- can be mundane, if set, the passphrase attribute
- works in a special manner. If the passphrase attribute is set,
- and no passphrase file object is sent in to run(),
- then GnuPG instnace will take care of sending the passphrase to
- GnuPG, the executable instead of having the user sent it in manually.
-
- * options -- Object of type GnuPGInterface.Options.
- Attribute-setting in options determines
- the command-line options used when calling GnuPG.
- '''
-
- def __init__(self):
- self.call = 'gpg'
- self.passphrase = None
- self.options = Options()
-
-
- def run(self, gnupg_commands, args = None, create_fhs = None, attach_fhs = None):
- '''Calls GnuPG with the list of string commands gnupg_commands,
- \tcomplete with prefixing dashes.
- \tFor example, gnupg_commands could be
- \t\'["--sign", "--encrypt"]\'
- \tReturns a GnuPGInterface.Process object.
- \t
- \targs is an optional list of GnuPG command arguments (not options),
- \tsuch as keyID\'s to export, filenames to process, etc.
-
- create_fhs is an optional list of GnuPG filehandle
- names that will be set as keys of the returned Process object\'s
- \'handles\' attribute. The generated filehandles can be used
- to communicate with GnuPG via standard input, standard output,
- the status-fd, passphrase-fd, etc.
-
- Valid GnuPG filehandle names are:
- * stdin
- * stdout
- * stderr
- * status
- * passphase
- * command
- * logger
-
- The purpose of each filehandle is described in the GnuPG
- documentation.
-
- attach_fhs is an optional dictionary with GnuPG filehandle
- names mapping to opened files. GnuPG will read or write
- to the file accordingly. For example, if \'my_file\' is an
- opened file and \'attach_fhs[stdin] is my_file\', then GnuPG
- will read its standard input from my_file. This is useful
- if you want GnuPG to read/write to/from an existing file.
- \tFor instance:
-
- \t f = open("encrypted.gpg")
- gnupg.run(["--decrypt"], attach_fhs={\'stdin\': f})
-
- Using attach_fhs also helps avoid system buffering
- issues that can arise when using create_fhs, which
- can cause the process to deadlock.
-
- If not mentioned in create_fhs or attach_fhs,
- \tGnuPG filehandles which are a std* (stdin, stdout, stderr)
- are defaulted to the running process\' version of handle.
- \tOtherwise, that type of handle is simply not used when calling GnuPG.
- \tFor example, if you do not care about getting data from GnuPG\'s
- \tstatus filehandle, simply do not specify it.
- \t
- \trun() returns a Process() object which has a \'handles\'
- which is a dictionary mapping from the handle name
- (such as \'stdin\' or \'stdout\') to the respective
- newly-created FileObject connected to the running GnuPG process.
- \tFor instance, if the call was
-
- process = gnupg.run(["--decrypt"], stdin=1)
-
- \tafter run returns \'process.handles["stdin"]\'
- is a FileObject connected to GnuPG\'s standard input,
- \tand can be written to.
- '''
- if args == None:
- args = []
-
- if create_fhs == None:
- create_fhs = []
-
- if attach_fhs == None:
- attach_fhs = { }
-
- for std in _stds:
- if not attach_fhs.has_key(std) and std not in create_fhs:
- attach_fhs.setdefault(std, getattr(sys, std))
- continue
-
- handle_passphrase = 0
- if self.passphrase != None and not attach_fhs.has_key('passphrase') and 'passphrase' not in create_fhs:
- handle_passphrase = 1
- create_fhs.append('passphrase')
-
- process = self._attach_fork_exec(gnupg_commands, args, create_fhs, attach_fhs)
- if handle_passphrase:
- passphrase_fh = process.handles['passphrase']
- passphrase_fh.write(self.passphrase)
- passphrase_fh.close()
- del process.handles['passphrase']
-
- return process
-
-
- def _attach_fork_exec(self, gnupg_commands, args, create_fhs, attach_fhs):
- '''This is like run(), but without the passphrase-helping
- \t(note that run() calls this).'''
- process = Process()
- for fh_name in create_fhs + attach_fhs.keys():
- if not _fd_modes.has_key(fh_name):
- raise KeyError, "unrecognized filehandle name '%s'; must be one of %s" % (fh_name, _fd_modes.keys())
- _fd_modes.has_key(fh_name)
-
- for fh_name in create_fhs:
- if attach_fhs.has_key(fh_name):
- raise ValueError, "cannot have filehandle '%s' in both create_fhs and attach_fhs" % fh_name
- attach_fhs.has_key(fh_name)
- pipe = os.pipe()
- if _fd_modes[fh_name] == 'w':
- pipe = (pipe[1], pipe[0])
-
- process._pipes[fh_name] = Pipe(pipe[0], pipe[1], 0)
-
- for fh_name, fh in attach_fhs.items():
- process._pipes[fh_name] = Pipe(fh.fileno(), fh.fileno(), 1)
-
- process.pid = os.fork()
- if process.pid == 0:
- self._as_child(process, gnupg_commands, args)
-
- return self._as_parent(process)
-
-
- def _as_parent(self, process):
- '''Stuff run after forking in parent'''
- for k, p in process._pipes.items():
- if not p.direct:
- os.close(p.child)
- process.handles[k] = os.fdopen(p.parent, _fd_modes[k])
- continue
-
- del process._pipes
- return process
-
-
- def _as_child(self, process, gnupg_commands, args):
- '''Stuff run after forking in child'''
- for std in _stds:
- p = process._pipes[std]
- os.dup2(p.child, getattr(sys, '__%s__' % std).fileno())
-
- for k, p in process._pipes.items():
- if p.direct and k not in _stds:
- fcntl.fcntl(p.child, fcntl.F_SETFD, 0)
- continue
-
- fd_args = []
- for k, p in process._pipes.items():
- if k not in _stds:
- fd_args.extend([
- _fd_options[k],
- '%d' % p.child])
-
- if not p.direct:
- os.close(p.parent)
- continue
-
- command = [
- self.call] + fd_args + self.options.get_args() + gnupg_commands + args
- os.execvp(command[0], command)
-
-
-
- class Pipe:
- '''simple struct holding stuff about pipes we use'''
-
- def __init__(self, parent, child, direct):
- self.parent = parent
- self.child = child
- self.direct = direct
-
-
-
- class Options:
- """Objects of this class encompass options passed to GnuPG.
- This class is responsible for determining command-line arguments
- which are based on options. It can be said that a GnuPG
- object has-a Options object in its options attribute.
-
- Attributes which correlate directly to GnuPG options:
-
- Each option here defaults to false or None, and is described in
- GnuPG documentation.
-
- Booleans (set these attributes to booleans)
-
- * armor
- * no_greeting
- * no_verbose
- * quiet
- * batch
- * always_trust
- * rfc1991
- * openpgp
- * force_v3_sigs
- * no_options
- * textmode
-
- Strings (set these attributes to strings)
-
- * homedir
- * default_key
- * comment
- * compress_algo
- * options
-
- Lists (set these attributes to lists)
-
- * recipients (***NOTE*** plural of 'recipient')
- * encrypt_to
-
- Meta options
-
- Meta options are options provided by this module that do
- not correlate directly to any GnuPG option by name,
- but are rather bundle of options used to accomplish
- a specific goal, such as obtaining compatibility with PGP 5.
- The actual arguments each of these reflects may change with time. Each
- defaults to false unless otherwise specified.
-
- meta_pgp_5_compatible -- If true, arguments are generated to try
- to be compatible with PGP 5.x.
-
- meta_pgp_2_compatible -- If true, arguments are generated to try
- to be compatible with PGP 2.x.
-
- meta_interactive -- If false, arguments are generated to try to
- help the using program use GnuPG in a non-interactive
- environment, such as CGI scripts. Default is true.
-
- extra_args -- Extra option arguments may be passed in
- via the attribute extra_args, a list.
-
- >>> import GnuPGInterface
- >>>
- >>> gnupg = GnuPGInterface.GnuPG()
- >>> gnupg.options.armor = 1
- >>> gnupg.options.recipients = ['Alice', 'Bob']
- >>> gnupg.options.extra_args = ['--no-secmem-warning']
- >>>
- >>> # no need for users to call this normally; just for show here
- >>> gnupg.options.get_args()
- ['--armor', '--recipient', 'Alice', '--recipient', 'Bob', '--no-secmem-warning']
- """
-
- def __init__(self):
- self.armor = 0
- self.no_greeting = 0
- self.verbose = 0
- self.no_verbose = 0
- self.quiet = 0
- self.batch = 0
- self.always_trust = 0
- self.rfc1991 = 0
- self.openpgp = 0
- self.force_v3_sigs = 0
- self.no_options = 0
- self.textmode = 0
- self.meta_pgp_5_compatible = 0
- self.meta_pgp_2_compatible = 0
- self.meta_interactive = 1
- self.homedir = None
- self.default_key = None
- self.comment = None
- self.compress_algo = None
- self.options = None
- self.encrypt_to = []
- self.recipients = []
- self.extra_args = []
-
-
- def get_args(self):
- '''Generate a list of GnuPG arguments based upon attributes.'''
- return self.get_meta_args() + self.get_standard_args() + self.extra_args
-
-
- def get_standard_args(self):
- '''Generate a list of standard, non-meta or extra arguments'''
- args = []
- if self.homedir != None:
- args.extend([
- '--homedir',
- self.homedir])
-
- if self.options != None:
- args.extend([
- '--options',
- self.options])
-
- if self.comment != None:
- args.extend([
- '--comment',
- self.comment])
-
- if self.compress_algo != None:
- args.extend([
- '--compress-algo',
- self.compress_algo])
-
- if self.default_key != None:
- args.extend([
- '--default-key',
- self.default_key])
-
- if self.no_options:
- args.append('--no-options')
-
- if self.armor:
- args.append('--armor')
-
- if self.textmode:
- args.append('--textmode')
-
- if self.no_greeting:
- args.append('--no-greeting')
-
- if self.verbose:
- args.append('--verbose')
-
- if self.no_verbose:
- args.append('--no-verbose')
-
- if self.quiet:
- args.append('--quiet')
-
- if self.batch:
- args.append('--batch')
-
- if self.always_trust:
- args.append('--always-trust')
-
- if self.force_v3_sigs:
- args.append('--force-v3-sigs')
-
- if self.rfc1991:
- args.append('--rfc1991')
-
- if self.openpgp:
- args.append('--openpgp')
-
- for r in self.recipients:
- args.extend([
- '--recipient',
- r])
-
- for r in self.encrypt_to:
- args.extend([
- '--encrypt-to',
- r])
-
- return args
-
-
- def get_meta_args(self):
- '''Get a list of generated meta-arguments'''
- args = []
- if self.meta_pgp_5_compatible:
- args.extend([
- '--compress-algo',
- '1',
- '--force-v3-sigs'])
-
- if self.meta_pgp_2_compatible:
- args.append('--rfc1991')
-
- if not self.meta_interactive:
- args.extend([
- '--batch',
- '--no-tty'])
-
- return args
-
-
-
- class Process:
- """Objects of this class encompass properties of a GnuPG
- process spawned by GnuPG.run().
-
- # gnupg is a GnuPG object
- process = gnupg.run( [ '--decrypt' ], stdout = 1 )
- out = process.handles['stdout'].read()
- ...
- os.waitpid( process.pid, 0 )
-
- Data Attributes
-
- handles -- This is a map of filehandle-names to
- the file handles, if any, that were requested via run() and hence
- are connected to the running GnuPG process. Valid names
- of this map are only those handles that were requested.
-
- pid -- The PID of the spawned GnuPG process.
- Useful to know, since once should call
- os.waitpid() to clean up the process, especially
- if multiple calls are made to run().
- """
-
- def __init__(self):
- self._pipes = { }
- self.handles = { }
- self.pid = None
- self._waited = None
-
-
- def wait(self):
- '''Wait on the process to exit, allowing for child cleanup.
- Will raise an IOError if the process exits non-zero.'''
- e = os.waitpid(self.pid, 0)[1]
- if e != 0:
- raise IOError, 'GnuPG exited non-zero, with code %d' % (e >> 8)
- e != 0
-
-
-
- def _run_doctests():
- import doctest
- import GnuPGInterface
- return doctest.testmod(GnuPGInterface)
-
- GnuPGInterface = GnuPG
- if __name__ == '__main__':
- _run_doctests()
-
-